package ins.framework.kafka;

import ins.framework.kafka.assignpartion.OffsetUtil;
import kafka.common.ErrorMapping;
import kafka.common.OffsetMetadataAndError;
import kafka.common.TopicAndPartition;
import kafka.javaapi.OffsetFetchRequest;
import kafka.javaapi.OffsetFetchResponse;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.network.BlockingChannel;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.SerializationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.time.Duration;
import java.util.*;

public class ConsumePartions implements Runnable{
    private static final Logger logger = LoggerFactory.getLogger(ConsumePartions.class);
    private List<String> topics;
    private Map<String, Object> consumerConfig;
    private int consumerIndex;
    private  BlockingChannel channel = null;//用于连接 offset manager

    public ConsumePartions(List<String> topics, Map<String, Object> consumerConfig,int consumerIndex) {
        this.topics = topics;
        this.consumerConfig = consumerConfig;
        this.consumerIndex = consumerIndex;
    }

    @Override
    public void run() {
        //kafka Consumer是非线程安全的,所以需要每个线程建立一个consumer
        //new Properties()

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerConfig);
       // consumerIndex = 1;
        TopicPartition p = new TopicPartition(topics.get(0),consumerIndex);//指定分区消费
        System.out.println(consumerIndex);
//        consumer.assign(Arrays.asList(p));
//        consumer.seek(p,0);//指定从哪0个位置开始消费
        consumer.subscribe(topics);
        Map<TopicPartition, Long> timestampsToSearch = new HashMap<TopicPartition, Long>();
        try {
            int i=0;
            //OffsetUtil.main(null);
            if(true){
               // return;
            }
            long currentOffset = 0;
            while (!Thread.currentThread().isInterrupted()) {
                ConsumerRecords<String, String> records = null;
                try {
                    Set<TopicPartition> partitions = consumer.paused();
                    //如果没有拉取到数据，会抛出异常，拉取到了，不会抛异常
                    timestampsToSearch.put(p,System.currentTimeMillis());
                    records = consumer.poll(1000L);
                    if(records.count()<1){
                       // consumer.seek(p,++currentOffset);//指定从哪0个位置开始消费
                    }



                }catch (SerializationException e){
                    System.out.println(Thread.currentThread()+"error");
//                    Map<TopicPartition, OffsetAndTimestamp> temp = consumer.offsetsForTimes(timestampsToSearch);
//                    OffsetAndTimestamp off = temp.get(p);
                       // consumer.seek(p,++currentOffset);//指定从哪0个位置开始消费

                    //提交当前消费分区的offset ,适用于一个消费实例消费一个topic的一个分区
                    //processSeriaErr();
                   // e.printStackTrace();
                    continue;
                }catch (Exception e){
                    e.printStackTrace();
                    System.out.println("=====");
                    return;
                }
                i++;
                if(i>5){
                   // break;
                }
                if (records.count() > 0) {
                    logger.debug("poll records size: " + records.count());
                }

                for (TopicPartition partition : records.partitions()) {
                    List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
                    for (ConsumerRecord<String, String> record : partitionRecords) {
//                        if(record.value()==null || record.value().trim().equals("")){
//                           // System.out.println(record.key()+"888888888888888888:"+record.value());
//                           // break;
//                        }
                        if(partition.toString().endsWith("-5") && record.offset()==1){
                           // System.out.println(Thread.currentThread().getName()+"：5分区不消费了，第二条记录不消费了");
                            if(!Thread.currentThread().getName().endsWith("2")){
                              //  return;
                            }

                        }
                        String url = Thread.currentThread().getName()+":partition:"+partition.toString()+"【"+record.offset() + "】 "+record.value()
                                +"【"+record.key()+"】";
                        System.out.println(url);
                    }
                    long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
                    currentOffset = lastOffset;
                    consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
                }
            }
        }finally{
            consumer.close();
        }
    }


}
